#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <dirent.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "yid.h"
#include "fileinfo.h"
#include "volume_proto.h"
#include "md_map.h"
#include "vnode.h"
#include "stor_rpc.h"
#include "net_global.h"
#include "volume_ctl.h"

#include "lsv_lib.h"
#include "lsv_volume_proto.h"
#include "lsv_wbuffer.h"
#include "lsv_wbuffer_internal.h"
#include "lsv_rcache.h"

#define CORRUPT_DEBUG   1

#ifdef CORRUPT_DEBUG

int *xmap = NULL;
extern uint32_t lsv_feature;

void lsv_info_print_io_entry(lsv_volume_proto_t *lsv_info, lsv_io_entry_t *io_entry);

void init_xmap()
{
        if(xmap)
                return;

        xmap = malloc(100 * 1024 * 1024);
}

int mycrc(int npage, void *buf, int read)
{
        int x = 0;

        if(!xmap)
                init_xmap();

        for(int i=0;i<PAGE_SIZE/4;i++)
        {
                x ^= ((int *)buf)[i];
        }

        if(!read)
                xmap[npage] = x;
        else
        {
                if(xmap[npage] != x)
                        assert(0);
        }

        return x;
}

#endif

int _volume_proto_read_snapshot_lsv(const char *pool, const fileid_t *parent, const fileid_t *fileid, lsv_io_t *io, buffer_t *buf)
{
        int ret;
        nid_t nid;

        DINFO("_volume_proto_read_snapshot_lsv\n");

        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        io->id = *fileid;
        if (net_islocal(&nid)) {
                ret = volume_ctl_snapshot_read(parent, (io_t *)io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_read(&nid, parent, (io_t *)io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        DINFO("_volume_proto_read_snapshot_lsv end\n");

        return 0;
err_ret:
        if (ret == EREMCHG) {
                vnode_reload(pool, parent);
        }

        return _errno(ret);
}

int _volume_proto_read_snapshot_lsv_meta(const char *pool, const fileid_t *parent, const fileid_t *fileid, io_t *io, const char *snap, void *buf)
{
        int ret;
        nid_t nid;
        buffer_t _buf;

        mbuffer_init(&_buf, 0);

        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        io->id = *fileid;

        if (net_islocal(&nid)) {
                ret = volume_ctl_snapshot_read_meta(parent, io, snap, &_buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_read_meta(&nid, parent, io, snap, &_buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        mbuffer_get(&_buf, buf, io->size);

        mbuffer_free(&_buf);

        return 0;
err_ret:
        if (ret == EREMCHG) {
                vnode_reload(pool, parent);
        }

        mbuffer_free(&_buf);
        return _errno(ret);
}


int _volume_proto_read_lower_lsv(const char *pool, const fileid_t *parent, io_t *io, buffer_t *buf)
{
        int ret;
        nid_t nid;

        DINFO("_volume_proto_read_lower_lsv\n");

        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        io->id = *parent;

        if (net_islocal(&nid)) {
                ret = volume_ctl_lower_read(parent, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_lower_read(&nid, parent, (io_t *)io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }


        DINFO("_volume_proto_read_lower_lsv end\n");

        return 0;
err_ret:
        if (ret == EREMCHG) {
                vnode_reload(pool, parent);
        }

        return _errno(ret);
}

int volume_proto_remote_read_chunk(lsv_volume_proto_t *lsv_info, uint64_t volume_id, uint64_t chunk_id, uint32_t chunk_off, uint32_t size, void *buffer)
{
        int ret;
        io_t io;
        uint32_t _size, left, off;
        buffer_t _buf;
        fileid_t fileid;

        // TODO volume_id == 0?
        YASSERT(volume_id > 0);

        mk_volid(&fileid, volume_id);

        off = 0;
        left = size;
        while (left > 0) {
                _size = left < PAGE_SIZE ? left : PAGE_SIZE;

                mbuffer_init(&_buf, 0);

                lsv_io_init(&io, &fileid, chunk_id, chunk_off + off, _size, 0);

                ret = _volume_proto_read_lower_lsv(lsv_info->volume_proto->table1.pool, &fileid, &io, &_buf);
                if(ret)
                        GOTO(err_ret, ret);

                mbuffer_get(&_buf, buffer + off, _size);

                mbuffer_free(&_buf);

                off += _size;
                left -= _size;
        }

        return 0;
err_ret:
        return ret;
}


int volume_proto_read_header(lsv_volume_proto_t *lsv_info, const fileid_t *parent, const fileid_t *fileid, const char *snap, void *header)
{
        int ret;
        io_t io;
        uint32_t size, left, off;

        off = 0;
        left = sizeof(lsv_bitmap_header_t) + BITMAP_CHUNK_HEADER_SIZE(lsv_info->size);
        while (left > 0) {
                size = left < PAGE_SIZE ? left : PAGE_SIZE;

                io_init(&io, fileid, NULL, off, size, 0);
                ret = _volume_proto_read_snapshot_lsv_meta(lsv_info->volume_proto->table1.pool, parent, fileid, &io, snap, (char *)header + off);
                if(ret)
                        GOTO(err_ret, ret);

                off += size;
                left -= size;
        }

        return 0;
err_ret:
        DERROR("read header failed, %x\r\n", ret);
        return ret;
}

int lsv_page_read_nolock(lsv_volume_proto_t *volume_proto, uint64_t offset, uint32_t size, raw_io_t * rio, buffer_t *append_buf) {
        int ret = 0;
        lsv_u32_t chunk_id = 0, chunk_offset;
        lsv_u64_t volume_id = 0;        //0 indicated the current volume.

        YASSERT(size<=LSV_PAGE_SIZE);

        uint64_t lba = OFF2LBA(offset);
        uint32_t page_before = offset % LSV_PAGE_SIZE;
        uint32_t page_after = LSV_PAGE_SIZE - page_before - size;

        YASSERT(page_before + size + page_after == LSV_PAGE_SIZE);

        PRINT_PAGE(offset, size, 0, "");

        // all read
        volume_proto->read_wbuf_hit++;

        if (lsv_feature & LSV_FEATURE_WBUF_READ) {
                ret = lsv_wbuffer_page_read(volume_proto, offset, size, append_buf);
                if (ret > 0) {
                        DINFO("wbuffer_readpage ret %u\n", ret);
                        goto out;
                }
        }

        PRINT_PAGE(offset, size, ret, "wbuf miss");
        volume_proto->read_wbuf_miss++;

        //在查询bitmap之前，先根据lba查询rcache
        ret = lsv_rcache_page_lookup(volume_proto, offset, size, append_buf);
        if(ret > 0){
                DINFO("rcache_page_lookup ret %d\n", ret);
                goto out;
        }

        PRINT_PAGE(offset, size, ret, "lsv_rcache_page_lookup miss");
        volume_proto->read_page_miss++;

        //	lsv_bitmap_lock(volume_proto);
        ret = lsv_bitmap_paged_lookup(volume_proto, lba, &volume_id, &chunk_id, &chunk_offset);

        YASSERT(chunk_offset % LSV_PAGE_SIZE == 0);

        // TODO 若返回<0, 0>，意味着什么？
        DINFO("refmap read lba %ju chunk_id %u page %d volume_id %ju ret %d\n",
              lba, chunk_id, CHUNKOFF2PAGEIDX(chunk_offset), volume_id, ret);

        if (unlikely(ret)) {
                DERROR("bitmap_paged_lookup err.errno:%d\n", ret);
                goto unlock_ret;
        }

        if (volume_id && volume_id != volume_proto->ino) {
                lsv_io_t io;
                fileid_t fileid;
                mk_volid(&fileid, volume_id);

                DINFO("lsv_page_read_nolock readparent\n");

                lsv_io_init(&io, &fileid, chunk_id, chunk_offset + (offset % LSV_PAGE_SIZE), size, 0);
                ret = _volume_proto_read_snapshot_lsv(volume_proto->volume_proto->table1.pool, &fileid, &fileid, &io, append_buf);
                if(!ret)
                        ret = io.size;
                else
                        ret = 0;

                DINFO("lsv_page_read_nolock return %d\n", ret);

        } else if (chunk_id > 0) {
                ret = lsv_rcache_lookup(volume_proto, chunk_id, chunk_offset, offset, size, rio, append_buf);
                if (ret < 0) {
                        DERROR("rcache_lookup err. ret %d\n", ret);
                        goto unlock_ret;
                }
        } else {
                DINFO("read:chunk_id ==== 0\n");
                ret = 0;
        }


out:
        if (ret < size) {
                PRINT_PAGE(offset, size, ret, "");
                mbuffer_appendzero(append_buf, size - ret);
        }

#ifdef __PAGE_CRC32__
        // 如果要读取的非完整页，不计算crc
        // 另外，更加append_buf计算crc，不准确，因为如果读取的页数大于1，会累积到append_buf里，总是计算第一页的crc
        uint32_t crc = (offset == lba && size == LSV_PAGE_SIZE) ? mbuffer_crc(append_buf, 0, size) : 0;
#else
	uint32_t crc = 0;
#endif
        DINFO("end lba %ju crc %u <%ju,%u> volume_id %ju chunk_id %u ret %d\n",
              lba,
              crc,
              offset,
              size,
              volume_id,
              chunk_id,
              ret);
        return size;

unlock_ret:
        return ret;
}

int lsv_volume_proto_read(volume_proto_t *volume_proto, const io_t *io, buffer_t *buf) {
        int rc = 0;
        lsv_u64_t offset, lba;
        lsv_u32_t size, left;
        lsv_u32_t read_count = 0;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;

        raw_io_t rio;
        raw_io_init(&rio, io->offset, io->size);

        offset = io->offset;
        left = io->size;

        DINFO("begin io read <%ju,%u>\n", offset, left);

        //lsv_rdlock(&wbuf->wb_vol_rwlock);

        int total = lsv_page_num(offset, left);
        int count = 0;
        while (left > 0) {
                size = LSV_PAGE_SIZE - offset % LSV_PAGE_SIZE;
                size = _min(size, left);

                lba = OFF2LBA(offset);

                DINFO("read %d/%d ino %ju lba %ju io %p\n", count, total, lsv_info->ino, lba, io);
                count++;

                ltable_rdlock(&lsv_info->lock_table, lba);
                rc = lsv_page_read_nolock(lsv_info, offset, size, &rio, buf);
                ltable_unlock(&lsv_info->lock_table, lba);
                if (rc < 0) {
                        DERROR("readpage_lsv err.errno:%d\n", rc);
                        goto ret;
                }

                read_count += rc;

                offset += size;
                left -= size;
        }

        if (read_count < io->size) {
                mbuffer_appendzero(buf, io->size - read_count);
        }
        rc = 0;
ret:
        //lsv_unlock(&wbuf->wb_vol_rwlock);
        lsv_info_stat(lsv_info);
        DINFO("read:exit:%d\n", read_count);
        return rc;
}


/** write
 * @todo 并发
 *
 * @param lsv_info
 * @param io
 * @param buf
 * @return
 */
int volume_proto_mem_append_io(lsv_volume_proto_t *lsv_info, const io_t *io, buffer_t *buf) {
        int ret = 0;
        lsv_u64_t offset;
        lsv_u32_t size, left;

        YASSERT(io->size == buf->len);

        wbuf_io_frag_t *io_frag;

        offset = io->offset;
        left = io->size;
        while (left > 0) {
                // TODO 分页, 串行过程?
                size = LSV_PAGE_SIZE - offset % LSV_PAGE_SIZE;
                size = _min(left, size);

                ret = ymalloc((void **)&io_frag, sizeof(wbuf_io_frag_t));
                if (unlikely(ret))
                        YASSERT(0);

                io_frag->lsv_info = lsv_info;
                io_frag->io = io;
                io_frag->buf = buf;
                io_frag->lba = OFF2LBA(offset);
                io_frag->off = offset;
                io_frag->size = size;
                io_frag->is_fill = 0;

                int buflen = buf->len;

                ret = lsv_page_write(io_frag);
                if (unlikely(ret)) {
                        DERROR("writepage_lsv err.errno:%d\n", ret);
                        GOTO(err_ret, ret);
                }

                YASSERT(buf->len == buflen - size);
                DINFO("pop %p %u - %u == %u\n", buf, buflen, buf->len, size);
                offset += size;
                left -= size;
        }

        YASSERT(buf->len == 0);
        YASSERT(left == 0);
        return 0;
err_ret:
        return ret;
}

#if 0
int write_data_buffer(lsv_volume_proto_t *lsv_info, const io_t *io, buffer_t *buf) {
        int ret;
        lsv_u64_t off;
        lsv_u32_t left, step;

        lsv_wbuf_segment_t *segment = get_current_segment(lsv_info);
        uint32_t free_size = (uint32_t)lsv_wbuffer_segment_get_free_size(segment);

        YASSERT(free_size >= 0 && free_size % LSV_PAGE_SIZE == 0);
        DINFO("free_size %u\n", free_size);

        left = io->size;
        off = io->offset;
        while (left > 0) {
                // 分segment，一个io可以跨segment
                if (free_size > 0) {
                        // maybe split io
                        step = _min(left, free_size - (uint32_t)(io->offset % LSV_PAGE_SIZE));
                        free_size = 0;
                } else {
                        step = left;
                }

                io_t temp_io;
                UNIMPLEMENTED(__DUMP__);//vclock need update
                io_init(&temp_io, NULL, io->clock, off, step, io->flags);
                temp_io.lsn = io->lsn;
                ret = volume_proto_mem_append_io(lsv_info, &temp_io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                left -= step;
                off += step;
        }

        return 0;
err_ret:
        return ret;
}
#endif

int _wbuf_qos(lsv_volume_proto_t *lsv_info, const io_t *io) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
#if 0
        // TODO QOS
        // flow in/out
        int ms = 0;
        if (wbuf->work_list.count >= LSV_WBUF_CHUNK_GROUP_MAX * 6 / 8) {
                ms = 8;
        } else if (wbuf->work_list.count >= LSV_WBUF_CHUNK_GROUP_MAX * 5 / 8) {
                ms = 4;
        } else if (wbuf->work_list.count >= LSV_WBUF_CHUNK_GROUP_MAX * 4 / 8) {
                ms = 2;
        } else if (wbuf->work_list.count >= LSV_WBUF_CHUNK_GROUP_MAX * 3 / 8) {
                ms = 1;
        }

        if (ms > 0) {
                DERROR("flow %d ms %d\n", 0, ms);
                schedule_sleep("append_io", ms * 1000);
        }
#else
        int page_num = lsv_page_num(io->offset, io->size);
        int64_t delay;
        lsv_wbuf_qos_wait(&wbuf->qos, wbuf->wait_list.count, page_num, &delay);
        if (delay > 0) {
                schedule_sleep("append_io", delay);
        }
        lsv_wbuf_qos_update(&wbuf->qos, page_num, 0);
#endif

        return 0;
}

typedef struct __io_mq_ctx_t {
        lsv_volume_proto_t *lsv_info;
        // for normal
        io_t *io;
        buffer_t *buf;
        // for fill
        wbuf_io_head_t *io_head;
        // for sync
        co_cond_t cond;
} io_mq_ctx_t;

#if 0
static int __do_write_io(void *arg) {
        int ret;
        io_mq_ctx_t *ioctx = arg;

        ret = volume_proto_mem_append_io(ioctx->lsv_info, ioctx->io, ioctx->buf);
        if (unlikely(ret)) {
                GOTO(ERR1, ret);
        }

        co_cond_broadcast(&ioctx->cond, ret);
        return 0;
ERR1:
        return ret;
}

static int __do_write_io_fill(void *arg) {
        int ret;
        io_mq_ctx_t *ioctx = arg;

        // 放在WAL后，是否是顺序一致性的本质要求？
        ret = lsv_wbuffer_chunk_fill(ioctx->lsv_info, ioctx->io_head);
        if (unlikely(ret)) {
                GOTO(ERR1, ret);
        }

        co_cond_broadcast(&ioctx->cond, ret);
        return 0;
ERR1:
        return ret;
}
#endif

/**
 * @todo 把产生yield的操作提到函数入口处。
 * @todo 保证IO的顺序一致性, 在返回之前不可见。
 * @todo 保证更新索引，写返回之间的原子性，不被中断
 * @todo 同一LBA上的并发操作 (512+512, 512+4k, 4k+4k)
 * @todo if io->size is large, i.e. > 1M
 *
 * 并发IO的基本流程：
 * - 等待所有条件满足（检查条件不能被中断, 条件变量）
 * - 确定LSN，预分配位置（串行, 不能中断）
 * - 执行（并发）
 * - 更新索引，使之可见（不能被中断）
 * - 写返回
 *
 * @pre wbuf有足够位置，wal有足够位置
 * @param lsv_info
 * @param io
 * @param buf
 * @return
 */
int volume_proto_write_lsv_i(lsv_volume_proto_t *lsv_info, io_t *io, buffer_t *buf) {
        int ret;
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        char *flatbuf = NULL;
        wal_io_head_t *wal_io_head = NULL;
        wbuf_io_head_t *wbuf_io_head = NULL;
        // struct timeval t1, t2, t3, t4, t5;
        // int64_t used1, used2, used3, used4;

        lsv_info->share.write_enter_count++;

        // _gettimeofday(&t1, NULL);

        // stage 1: wait
        lsv_wbuffer_wait_io_write_pre(lsv_info, io);

        // _gettimeofday(&t2, NULL);
        // used1 = _time_used(&t1, &t2);

        lsv_info->input_bytes += io->size;

        // stage 2: prepare, 本质是串行化的, 不可被中断
        {
                // 至此，可以给IO分配LSN了, 满足COCSR，提交保序的冲突可串行化。
                // wbuf和wal，都需要保持该顺序，以保证wbuf，bitmap/log查询一致性。
                // 同时，需要与后台任务（GC）进行同步.

                if (lsv_feature & LSV_FEATURE_WAL2) {
                        uint32_t buflen;
                        uint32_t padding;

                        buflen = sizeof(wal_io_head_t) + io->size;
                        if (buflen % LSV_PAGE_SIZE != 0) {
                                padding = LSV_PAGE_SIZE - buflen % LSV_PAGE_SIZE;
                                buflen += padding;
                        } else {
                                padding = 0;
                        }

                        // @malloc ERR1
                        ret = ymalloc((void **)&flatbuf, buflen);
                        if (unlikely(ret)) {
                                YASSERT(0);
                        }

                        // flatbuf := header + data + padding
                        wal_io_head = (void *)flatbuf;
                        mbuffer_get(buf, flatbuf + sizeof(wal_io_head_t), io->size);
                        if (padding) {
                                memset(flatbuf + buflen - padding, 0, padding);
                        }

                        // TODO WAL LSN必须有序，否则恢复过程有误判可能。 lsn大的IO可能先到达这里！!!
                        // 不能有yield，协程切换后乱序。
                        ret = lsv_wal2_segment_append_prepare(lsv_info, io, wal_io_head);
                        if (unlikely(ret)) {
                                DERROR("ret %d\n", ret);
                                GOTO(ERR1, ret);
                        }

                        io->lsn = wal_io_head->lsn;
                } else {
                        io->lsn = ++wbuf->last_lsn;
                }

#if WBUF_USE_PREPARE
                // 预分配wbuf，分页
                // IO内每页是连续的，且多个IO按LSN单调递增

                // @malloc ERR2
                lsv_wbuffer_chunk_prepare(lsv_info, io, buf, &wbuf_io_head);
                YASSERT(wbuf_io_head != NULL);
#endif

                // 至此，确定了IO的LSN, 顺序一致性, 按IO的完成序进行定义, 而不是进入序
                // 后面的wbuf，bitmap的提交，要保留这个顺序
                // 无中断，到此依然能保序LSN
                DINFO_NOP("ino %ju __last_lsn %ju lsn %ju\n", lsv_info->ino, wbuf->__last_lsn, io->lsn);

#if DEBUG_MODE
                YASSERT(io->lsn == wbuf->last_lsn);
                if (wbuf->__last_lsn != 0) {
                        YASSERT(io->lsn == wbuf->__last_lsn + 1);
                }
                wbuf->__last_lsn = io->lsn;
#endif
        }

        // _gettimeofday(&t3, NULL);
        // used2 = _time_used(&t2, &t3);

        // stage 3: just do it, 可以并行, fork and join pattern

        // align 4k，和segment malloc时，有yield，导致LSN乱序
        // commit时，并不能保证小于某个LSN的所有数据都已提交，也可能出现在后面的LOG里
        // 此时，回收WAL有潜在风险
        {
                // write-ahead log
                if (lsv_feature & LSV_FEATURE_WAL2) {
                        ret = lsv_wal2_segment_append(lsv_info, flatbuf);
                        if (unlikely(ret)) {
                                DERROR("ret %d\n", ret);
                                GOTO(ERR2, ret);
                        }
                }

                // _gettimeofday(&t4, NULL);
                // used3 = _time_used(&t3, &t4);

#if WBUF_USE_PREPARE

#if 0
                // IO间串行
                io_mq_ctx_t *io_mq_ctx;

                // @malloc free'd in other place
                ret = ymalloc((void **)&io_mq_ctx, sizeof(io_mq_ctx_t));
                if (unlikely(ret)) {
                        YASSERT(0);
                }

                io_mq_ctx->lsv_info = lsv_info;
                io_mq_ctx->io_head = wbuf_io_head;
                co_cond_init(&io_mq_ctx->cond);

                co_mq_offer(&wbuf->io_mq, "write_io", __do_write_io_fill, io_mq_ctx, 1);
                co_cond_wait(&io_mq_ctx->cond);
#else
                // 放在WAL后，是否是顺序一致性的本质要求？
                ret = lsv_wbuffer_chunk_fill_v2(lsv_info, wbuf_io_head);
                if (unlikely(ret)) {
                        DERROR("ret %d\n", ret);
                        GOTO(ERR2, ret);
                }
#endif

#else

#if 0
                // 不能放在WAL后，chunk_post_commit中LSN会乱序，大的在前，小的在后
                {
                        // IO间串行
                        io_mq_ctx_t *io_mq_ctx;

                        // @malloc free'd in other place
                        ret = ymalloc((void **)&io_mq_ctx, sizeof(io_mq_ctx_t));
                        if (unlikely(ret)) {
                                YASSERT(0);
                        }

                        io_mq_ctx->lsv_info = lsv_info;
                        io_mq_ctx->io = io;
                        io_mq_ctx->buf = buf;
                        co_cond_init(&io_mq_ctx->cond);

                        co_mq_offer(&wbuf->io_mq, "write_io", __do_write_io, io_mq_ctx, 1);
                        co_cond_wait(&io_mq_ctx->cond);
                }
#else
                // IO间并发
                ret = volume_proto_mem_append_io(lsv_info, io, buf);
                if (unlikely(ret)) {
                        GOTO(ERR1, ret);
                }
#endif

#endif
        }

#if 0
        // _gettimeofday(&t5, NULL);
        // used4 = _time_used(&t4, &t5);

        DERROR("io %p size %u used %ju %ju %ju %ju total %ju enter_count %u\n",
               io, io->size,
               used1, used2, used3, used4,
               (used1+used2+used3+used4),
               lsv_info->share.write_enter_count);
#endif

        if (wbuf_io_head) {
                yfree((void **)&wbuf_io_head);
        }

        if (flatbuf) {
                yfree((void **)&flatbuf);
        }

        // stage 4: 可见，更新索引，后面不能被中断

        // 返回
        lsv_info->share.write_enter_count--;
        return 0;
ERR2:
        if (wbuf_io_head) {
                yfree((void **)&wbuf_io_head);
        }
ERR1:
        if (flatbuf) {
                yfree((void **)&flatbuf);
        }

        lsv_info->share.write_enter_count--;
        return ret;
}

int lsv_volume_proto_write(volume_proto_t *volume_proto, const io_t *io, const buffer_t *buf) {
        int ret;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        lsv_io_entry_t *io_entry;

        YASSERT(io->size == buf->len);

        // struct timeval t1, t2, t3, t4;
        // int64_t used1, used2, used3;
        // _gettimeofday(&t1, NULL);

        // @malloc ERR1
        lsv_rdlock(&lsv_info->io_lock);
        {
                // trace IO

                // @malloc ERR2
                ret = ymalloc((void **)&io_entry, sizeof(lsv_io_entry_t));
                if (unlikely(ret)) {
                        YASSERT(0);
                        GOTO(ERR1, ret);
                }

                io_entry->io = io;
                io_entry->buffer = buf;
                count_list_add_tail(&io_entry->hook, &lsv_info->io_stream);

                DINFO("vol %ju io_stream %u\n", lsv_info->ino, lsv_info->io_stream.count);
#if DEBUG_MODE
                lsv_info_print_io_entry(lsv_info, io_entry);
#endif

                // _wbuf_qos(lsv_info, io);

                // _gettimeofday(&t2, NULL);
                // used1 = _time_used(&t1, &t2);

                ret = volume_proto_write_lsv_i(lsv_info, (io_t *)io, (buffer_t *)buf);
                if (unlikely(ret))
                        GOTO(ERR2, ret);

                // _gettimeofday(&t3, NULL);
                // used2 = _time_used(&t2, &t3);

                // untrace IO
                count_list_del_init(&io_entry->hook, &lsv_info->io_stream);

                yfree((void **)&io_entry);
        }
        lsv_unlock(&lsv_info->io_lock);

        // _gettimeofday(&t4, NULL);
        // used3 = _time_used(&t3, &t4);

#if 0
        DERROR("io %p size %u used %ju %ju %ju total %ju\n", io, io->size,
              used1, used2, used3, (used1+used2+used3));
#endif

        lsv_info->write_count++;

        // @WHY 从分配LSN到这里，LSN已不能保序
        lsv_info_stat(lsv_info);

        return 0;
ERR2:
        count_list_del_init(&io_entry->hook, &lsv_info->io_stream);
        yfree((void **)&io_entry);
ERR1:
        lsv_unlock(&lsv_info->io_lock);
        return ret;
}
